# -*- coding: utf-8 -*-
from datetime import timedelta
from utils.operators.cluster_for_spark_sql_operator import SparkSqlOperator
from utils.alerts.yl_threeSegCodeOnFailue import yl_threeSegCodeOnFailure
from utils.alerts.yl_threeSegCodeOnSuccess import yl_threeSegCodeOnSuccess

from jms.aigroup.ai_dwd.train_sample_daily_bak import jms_ai_dwd__train_sample_bak
from jms.aigroup.ai_dwd.yl_ml_clean_address_new_day import jms_ai_dwd__yl_ml_clean_address_new_day
from jms.aigroup.ai_group.geo_info_day import jms_aigroup__geo_info_day
from jms.dwd.tab.dwd_tab_barscan_collect_base_dt import jms_dwd__dwd_tab_barscan_collect_base_dt
def kwargs():
    kwargs = {
        "db": "ai_group",
        "table": "train_sample",
        "desc": "模型训练经纬度地址数据",
        "taskid": "10060",
        "ifprivacy": 0,
        "warnignore": 0,
    }
    return kwargs


jms_ai_group__train_sample_new = SparkSqlOperator(
    task_id='jms_ai_group__train_sample_new',
    email=['yushuo@jtexpress.com', 'yl_bigdata@yl-scm.com'],
    name='jms_ai_group__train_sample_new_{{ execution_date | date_add(1) | cst_ds }}',
    email_on_retry=True,
    retries=0,
    sql='jms/aigroup/ai_group/train_sample_new/execute.sql',
    yarn_queue='pro',
    driver_memory='4G',
    driver_cores=4,
    executor_memory='10G',
    executor_cores=4,
    pool_slots=2,
    pool='unlimited_pool',
    num_executors=90,  # spark.dynamicAllocation.enabled 为 True 时，num_executors 表示最少 Executor 数
    conf={'spark.executor.memoryOverhead': '4G',
          'spark.dynamicAllocation.enabled': 'true',
          'spark.shuffle.service.enabled': 'true',
          'spark.dynamicAllocation.maxExecutors': 100,
          'spark.sql.sources.partitionOverwriteMode': 'dynamic',
          'spark.dynamicallocation.enabled': 'true',
          'spark.dynamicAllocation.cachedExecutorIdleTimeout': 120,
          'spark.sql.shuffle.partitions': 1000,  # spark.sql.shuffle.partitions则是只对SparkSQL有效
          'spark.shuffle.memoryFraction': '0.8',
          'spark.executor.extraJavaOptions': '-XX:+UseG1GC -XX:ParallelGCThreads=4'
          },
    hiveconf={'hive.exec.dynamic.partition': 'true',
              'hive.exec.dynamic.partition.mode': 'nonstrict',
              'hive.exec.max.dynamic.partitions.pernode': 400,
              'hive.exec.max.dynamic.partitions': 400
              },
    execution_timeout=timedelta(hours=2),
    priority_weight=10,
    on_success_callback=yl_threeSegCodeOnSuccess(kwargs(), dingding_conn_id="dingding_ThreeSeg_etl_info"),
    on_failure_callback=yl_threeSegCodeOnFailure(kwargs(), dingding_conn_id="dingding_ThreeSeg_etl_alert"),
)
jms_ai_group__train_sample_new << [jms_ai_dwd__train_sample_bak,jms_ai_dwd__yl_ml_clean_address_new_day,jms_aigroup__geo_info_day,jms_dwd__dwd_tab_barscan_collect_base_dt]